Expand description
§object_store
This crate provides a uniform API for interacting with object
storage services and local files via the ObjectStore
trait.
Using this crate, the same binary and code can run in multiple clouds and local test environments, via a simple runtime configuration change.
§Highlights
-
A high-performance async API focused on providing a consistent interface mirroring that of object stores such as S3
-
Production quality, leading this crate to be used in large scale production systems, such as crates.io and [InfluxDB IOx]
-
Support for advanced functionality, including atomic, conditional reads and writes, vectored IO, bulk deletion, and more…
-
Stable and predictable governance via the Apache Arrow project
-
Small dependency footprint, depending on only a small number of common crates
Originally developed by InfluxData and subsequently donated to Apache Arrow.
§Available ObjectStore
Implementations
By default, this crate provides the following implementations:
- Memory:
InMemory
- Local filesystem:
LocalFileSystem
Feature flags are used to enable support for other implementations:
gcp
: Google Cloud Storage support. SeeGoogleCloudStorageBuilder
aws
: Amazon S3. SeeAmazonS3Builder
azure
: Azure Blob Storage. SeeMicrosoftAzureBuilder
http
: HTTP/WebDAV Storage. SeeHttpBuilder
§Why not a Filesystem Interface?
The ObjectStore
interface is designed to mirror the APIs
of object stores and not filesystems, and thus has stateless APIs instead
of cursor based interfaces such as Read
or Seek
available in filesystems.
This design provides the following advantages:
- All operations are atomic, and readers cannot observe partial and/or failed writes
- Methods map directly to object store APIs, providing both efficiency and predictability
- Abstracts away filesystem and operating system specific quirks, ensuring portability
- Allows for functionality not native to filesystems, such as operation preconditions and atomic multipart uploads
This crate does provide BufReader
and BufWriter
adapters
which provide a more filesystem-like API for working with the
ObjectStore
trait, however, they should be used with care
§Adapters
ObjectStore
instances can be composed with various adapters
which add additional functionality:
- Rate Throttling:
ThrottleConfig
- Concurrent Request Limit:
LimitStore
§Configuration System
This crate provides a configuration system inspired by the APIs exposed by fsspec,
PyArrow FileSystem, and Hadoop FileSystem, allowing creating a DynObjectStore
from a URL and an optional list of key value pairs. This provides a flexible interface
to support a wide variety of user-defined store configurations, with minimal additional
application complexity.
// Can manually create a specific store variant using the appropriate builder
let store: AmazonS3 = AmazonS3Builder::from_env()
.with_bucket_name("my-bucket").build().unwrap();
// Alternatively can create an ObjectStore from an S3 URL
let url = Url::parse("s3://bucket/path").unwrap();
let (store, path) = parse_url(&url).unwrap();
assert_eq!(path.as_ref(), "path");
// Potentially with additional options
let (store, path) = parse_url_opts(&url, vec![("aws_access_key_id", "...")]).unwrap();
// Or with URLs that encode the bucket name in the URL path
let url = Url::parse("https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket/path").unwrap();
let (store, path) = parse_url(&url).unwrap();
assert_eq!(path.as_ref(), "path");
§List objects
Use the ObjectStore::list
method to iterate over objects in
remote storage or files in the local filesystem:
// create an ObjectStore
let object_store: Arc<dyn ObjectStore> = get_object_store();
// Recursively list all files below the 'data' path.
// 1. On AWS S3 this would be the 'data/' prefix
// 2. On a local filesystem, this would be the 'data' directory
let prefix = Path::from("data");
// Get an `async` stream of Metadata objects:
let mut list_stream = object_store.list(Some(&prefix));
// Print a line about each object
while let Some(meta) = list_stream.next().await.transpose().unwrap() {
println!("Name: {}, size: {}", meta.location, meta.size);
}
Which will print out something like the following:
Name: data/file01.parquet, size: 112832
Name: data/file02.parquet, size: 143119
Name: data/child/file03.parquet, size: 100
...
§Fetch objects
Use the ObjectStore::get
method to fetch the data bytes
from remote storage or files in the local filesystem as a stream.
// Create an ObjectStore
let object_store: Arc<dyn ObjectStore> = get_object_store();
// Retrieve a specific file
let path = Path::from("data/file01.parquet");
// Fetch just the file metadata
let meta = object_store.head(&path).await.unwrap();
println!("{meta:?}");
// Fetch the object including metadata
let result: GetResult = object_store.get(&path).await.unwrap();
assert_eq!(result.meta, meta);
// Buffer the entire object in memory
let object: Bytes = result.bytes().await.unwrap();
assert_eq!(object.len(), meta.size);
// Alternatively stream the bytes from object storage
let stream = object_store.get(&path).await.unwrap().into_stream();
// Count the '0's using `try_fold` from `TryStreamExt` trait
let num_zeros = stream
.try_fold(0, |acc, bytes| async move {
Ok(acc + bytes.iter().filter(|b| **b == 0).count())
}).await.unwrap();
println!("Num zeros in {} is {}", path, num_zeros);
§Put Object
Use the ObjectStore::put
method to atomically write data.
let object_store: Arc<dyn ObjectStore> = get_object_store();
let path = Path::from("data/file1");
let payload = PutPayload::from_static(b"hello");
object_store.put(&path, payload).await.unwrap();
§Multipart Upload
Use the ObjectStore::put_multipart
method to atomically write a large amount of data
let object_store: Arc<dyn ObjectStore> = get_object_store();
let path = Path::from("data/large_file");
let upload = object_store.put_multipart(&path).await.unwrap();
let mut write = WriteMultipart::new(upload);
write.write(b"hello");
write.finish().await.unwrap();
§Vectored Read
A common pattern, especially when reading structured datasets, is to need to fetch multiple, potentially non-contiguous, ranges of a particular object.
ObjectStore::get_ranges
provides an efficient way to perform such vectored IO, and will
automatically coalesce adjacent ranges into an appropriate number of parallel requests.
let object_store: Arc<dyn ObjectStore> = get_object_store();
let path = Path::from("data/large_file");
let ranges = object_store.get_ranges(&path, &[90..100, 400..600, 0..10]).await.unwrap();
assert_eq!(ranges.len(), 3);
assert_eq!(ranges[0].len(), 10);
§Vectored Write
When writing data it is often the case that the size of the output is not known ahead of time.
A common approach to handling this is to bump-allocate a Vec
, whereby the underlying
allocation is repeatedly reallocated, each time doubling the capacity. The performance of
this is suboptimal as reallocating memory will often involve copying it to a new location.
Fortunately, as PutPayload
does not require memory regions to be contiguous, it is
possible to instead allocate memory in chunks and avoid bump allocating. PutPayloadMut
encapsulates this approach
let object_store: Arc<dyn ObjectStore> = get_object_store();
let path = Path::from("data/large_file");
let mut buffer = PutPayloadMut::new().with_block_size(8192);
for _ in 0..22 {
buffer.extend_from_slice(&[0; 1024]);
}
let payload = buffer.freeze();
// Payload consists of 3 separate 8KB allocations
assert_eq!(payload.as_ref().len(), 3);
assert_eq!(payload.as_ref()[0].len(), 8192);
assert_eq!(payload.as_ref()[1].len(), 8192);
assert_eq!(payload.as_ref()[2].len(), 6144);
object_store.put(&path, payload).await.unwrap();
§Conditional Fetch
More complex object retrieval can be supported by ObjectStore::get_opts
.
For example, efficiently refreshing a cache without re-fetching the entire object data if the object hasn’t been modified.
struct CacheEntry {
/// Data returned by last request
data: Bytes,
/// ETag identifying the object returned by the server
e_tag: String,
/// Instant of last refresh
refreshed_at: Instant,
}
/// Example cache that checks entries after 10 seconds for a new version
struct Cache {
entries: HashMap<Path, CacheEntry>,
store: Arc<dyn ObjectStore>,
}
impl Cache {
pub async fn get(&mut self, path: &Path) -> Result<Bytes> {
Ok(match self.entries.get_mut(path) {
Some(e) => match e.refreshed_at.elapsed() < Duration::from_secs(10) {
true => e.data.clone(), // Return cached data
false => { // Check if remote version has changed
let opts = GetOptions {
if_none_match: Some(e.e_tag.clone()),
..GetOptions::default()
};
match self.store.get_opts(&path, opts).await {
Ok(d) => e.data = d.bytes().await?,
Err(Error::NotModified { .. }) => {} // Data has not changed
Err(e) => return Err(e),
};
e.refreshed_at = Instant::now();
e.data.clone()
}
},
None => { // Not cached, fetch data
let get = self.store.get(&path).await?;
let e_tag = get.meta.e_tag.clone();
let data = get.bytes().await?;
if let Some(e_tag) = e_tag {
let entry = CacheEntry {
e_tag,
data: data.clone(),
refreshed_at: Instant::now(),
};
self.entries.insert(path.clone(), entry);
}
data
}
})
}
}
§Conditional Put
The default behaviour when writing data is to upsert any existing object at the given path,
overwriting any previous value. More complex behaviours can be achieved using PutMode
, and
can be used to build Optimistic Concurrency Control based transactions. This facilitates
building metadata catalogs, such as Apache Iceberg or Delta Lake, directly on top of object
storage, without relying on a separate DBMS.
let store = get_object_store();
let path = Path::from("test");
// Perform a conditional update on path
loop {
// Perform get request
let r = store.get(&path).await.unwrap();
// Save version information fetched
let version = UpdateVersion {
e_tag: r.meta.e_tag.clone(),
version: r.meta.version.clone(),
};
// Compute new version of object contents
let new = do_update(r.bytes().await.unwrap());
// Attempt to commit transaction
match store.put_opts(&path, new.into(), PutMode::Update(version).into()).await {
Ok(_) => break, // Successfully committed
Err(Error::Precondition { .. }) => continue, // Object has changed, try again
Err(e) => panic!("{e}")
}
}
§TLS Certificates
Stores that use HTTPS/TLS (this is true for most cloud stores) can choose the source of their CA
certificates. By default the system-bundled certificates are used (see
rustls-native-certs
). The tls-webpki-roots
feature switch can be used to also bundle Mozilla’s
root certificates with the library/application (see webpki-roots
).
Modules§
- An object store implementation for S3
- An object store implementation for Azure blob storage
- Utilities for performing tokio-style buffered IO
- A
ChunkedStore
that can be used to test streaming behaviour - Utility for streaming newline delimited files from object storage
- An object store implementation for Google Cloud Storage
- An object store implementation for generic HTTP servers
- Integration tests for custom object store implementations
- An object store that limits the maximum concurrency of the wrapped implementation
- An object store implementation for a local filesystem
- An in-memory object store implementation
- Cloud Multipart Upload
- Path abstraction for Object Storage
- An object store wrapper handling a constant path prefix
- Abstraction of signed URL generation for those object store implementations that support it
- A throttling object store wrapper
Structs§
- The value of an
Attribute
- Additional attributes of an object
- Iterator over
Attributes
- Exponential backoff with decorrelated jitter algorithm
- Represents a CA certificate provided by the user.
- HTTP client configuration for remote object stores
- Options for a get request, such as range
- Result for a get request
- Result of a list call that includes objects, prefixes (directories) and a token for the next set of results. Individual result sets may be limited to 1,000 objects based on the underlying object storage’s limitations.
- The metadata that describes an object.
- Options for
ObjectStore::put_multipart_opts
- Options for a put request
- A cheaply cloneable, ordered collection of
Bytes
- An owning iterator of
PutPayload
- An iterator over
PutPayload
- A builder for
PutPayload
that avoids reallocating memory - Result for a put request
- The configuration for how to respond to request errors
- A static set of credentials
- A collection of key value pairs used to annotate objects
- Uniquely identifies a version of an object to update
- A synchronous write API for uploading data in parallel in fixed size chunks
Enums§
- Additional object attribute types
- Configuration keys for
ClientOptions
- A specialized
Error
for object store-related errors - Request only a portion of an object’s bytes
- The kind of a
GetResult
- Recognizes various URL formats, identifying the relevant
ObjectStore
- Configure preconditions for the put operation
Constants§
- Range requests with a gap less than or equal to this, will be coalesced into a single request by
coalesce_ranges
Traits§
- Provides credentials for use when signing requests
- A trait allowing writing an object in fixed size chunks
- Universal API to multiple object store services.
Functions§
- Takes a function
fetch
that can fetch a range of bytes and uses this to fetch the provided byteranges
- Collect a stream into
Bytes
avoiding copying in the event of a single chunk - Create an
ObjectStore
based on the providedurl
- Create an
ObjectStore
based on the providedurl
and options
Type Aliases§
- An alias for a dynamically dispatched object store implementation.
- Id type for multipart uploads.
- A specialized
Result
for object store-related errors - An upload part request